Skip to main content

Kafka & Zookeeper 基础知识

kafka简介

Kafka最早诞生是为了解决Linkedin的data pipeline问题。

log

数据处理最重要的是有一个完整的数据流,而不是数据模型

日志充当了一个缓存作用,使得读写可以异步

Data which is collected in batch is naturally processed in batch. When data is collected continuously, it is naturally processed continuously.

kdg2 0102

kdg2 0103

使用Kafka的优势是显而易见的:不需要再去做各种各样的routing和wiring,就能将metrics通过统一的平台进行统计、分析和分发。

Schema

Kafka的消息都是字节数组,但是也是有结构定义的。

Stream

kdg2 0105

Stream指的是同一个Topic的消息,和Partition的数量无关。同一个消息可以有多个Partition,也可以分散在多个节点上,提升性能和冗余度。

Producer Consumer

Producer指的就是消息的生产者,通常会平均地向每个Partition分摊消息。但也有特殊情况,生产者只会向特定的Partition发送消息。

Consumer读取消息,订阅一个或多个Topic,按顺序读取消息。

Offset

Consumer和Kafka都保留了一个Offset,一个不完全单调递增的int,后来的消息offset更大。每个partition的offset都被保存着,这样Consumer暂停接受消息后,就可以resume。

Consumer group

一个或多个Consumer共同消费同一个topic。Consumer group确保每个partition只会被一个member消费。这也被称为Ownership。

kdg2 0106

如果一个member fail了,那个partition会被重新分配到剩余的members中。

Brokers and Clusters

kdg2 0107

Broker从producer接受消息,设置offset,然后写入存储。消息有主从同步。

消息有存储限制和时长限制,超出了就会被删掉。

MirrorMaker

kdg2 0108

灾备的MirrorMaker本身就是一个consumer和producer,consume其他集群的消息然后produce到本地集群。

优势

多Producer,多Consumer,本地留存。

高性能,高可用

生态

kdg2 0109

Zookeeper

消息队列的挑战

消息延迟

​ P比Q先发送消息,但Q的消息先到

处理速度

​ T~ps~ + T~s~ + T~pr~ 时延 = 发送+传输+接受

时钟对齐

​ 处理器时钟有可能不准,或者未对齐

​ 消息没收到,有可能是三种情况的任意一种

主从结构

主节点crash

​ 需要备用Master,新的Master能够从crash时的状态恢复。此时,我们不能指望从crash掉的master那里读取恢复时的context,所以需要把数据存在别的地方,就是zookeeper里。

split-brain 一个系统中两个或多个部分独立地完成工作,比如master实际在工作,但由于网络故障,集群被划分为了两半。

Worker crash

​ 重新分配调度给Worker的任务。任务可能执行了一半,也可能已经完成但无法汇报结果。如果有Side effect,那么还需要清理状态。

通讯故障

​ 锁不能解决问题

具体功能

Master election

​ 拥有master才具有分配任务的功能,所以必须能够选举

Crash detection

​ Master能够检测到worker crash

Group membership management

​ Master能够得知哪些节点能够执行任务

Metadata management

​ 主节点和从节点都能够存储分配和执行状态

Zookeeper 并不是 Byzantine fault tolerant的,因为这会大大增加复杂性和开销。

Fischer, Lynch, and Patterson FLP

异步通信,并且会故障的进程,并没有办法对哪怕1bit的信息达成共识。

Impossibility of Distributed Consensus with One Faulty Process, 1983

ZNode tree

ZooKeeper data tree example.

Znode的所有信息都被存在一棵树里,包括Master,Workers,Tasks和Assign。

如果有数据缺失,就说明有地方出错了。比如Master这里的值是空,就说明需要进行Leader election。

事物原子性

Zookeeper有API可以对数据进行读写,但读或者写操作都是原子性的。

Polling vs Notification

Polling:

Multiple reads to the same znode./img/foo

可以看到,操作2是不必要的。

Notification:

Using notifications to indicate changes to a znode

Version Number

每个Znode都有一个版本号

Using versions to prevent inconsistencies due to concurrent updates

与Application 交互

ZooKeeper Architecture Overview.

Standalone mode:一个Zookeeper server

Quorum mode: 多个servers,states replicate

多数server确认状态被复制,才能成功更新状态,避免split-brain

Leader Election

Looking state,Follower state 和 Leader state三种状态

发现Leader 挂掉,马上进入looking state,然后互相给每个resemble 里的server发消息,是一个tuple包含自己的serverID(sID)和上一个请求的transaction ID(zxID)。然后对比,选出最新处理请求的当选leader

Leader election illustration.

但是这样做是建立在选举时网络条件很好的情况下。

Leader election illustration.

如果网络有延迟,就会出现这样的情况。因此,可以加上一个超时,设定等待的最长时间。Fast Election就设置了200ms

Zab: the ZooKeeper Atomic Broadcast protocol

  • 首先,Leader会向所有Follower发送一个Proposal 信息
  • Follower向Leader 发送 ACK
  • Leader 向Follower 发送 Commit

Regular message pattern to commit proposals.

这样做能确保事务的处理顺序,并且不遗漏事务。

Epoch:代表Leader的term,每次leader election都会增加epoch,每次transaction都会包含epoch

DIFF:转换Epoch时,如果相差不多,Leader会发送Follower缺失的transaction。

SNAP:相差很大,Leader会发送全量的snapshot

Observers

不参与投票,不参与proposal

Illustration of the leader server pipeline.

Master在处理完Proposal后,会同时处理Commit以及持久化到本地

回到Kafka,kafka的broker就是在zookeeper中以/brokers/ids的形式注册的。

由于ephemeral node的特性,当broker失联了,session断开,它的节点就会被zookeeper自动移除。

Controller

broker+ partition leader election,每个cluster只会同时有一个controller。

KRaft

2019年开始的项目

现有问题:

  • Metadata同步写入ZooKeeper,分发给broker却是异步。因此存在broker,controller,zookeeper三者状态不一致的情况
  • 当controller重启时,需要重新从zookeeper读全量的状态数据,非常慢。
  • Ownership不明确,同样的数据有些是controller改的,有些是broker改的,有些是zookeeper改的
  • Zookeeper本身也是一个分布式系统

解决方法:

KRaft,将controller分为 active controller 和follower controllers,active controller负责处理broker事务。

Controller nodes自己leader election,原本存储在zookeeper的数据存在一个log中。

KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum - Apache Kafka - Apache Software Foundation

KIP-595: A Raft Protocol for the Metadata Quorum - Apache Kafka - Apache Software Foundation

KIP-631: The Quorum-based Kafka Controller - Apache Kafka - Apache Software Foundation

Partition Replicas

每个Topic 可以有多个Partition,而每个Partition可以有多个replica。

Replica也分主从,主负责处理client request,从复制消息并且作为备份。

KIP-392之后,follower partition也可以处理client request。

KIP-392: Allow consumers to fetch from closest replica - Apache Kafka - Apache Software Foundation

Requests

Apache Kafka, Purgatory, and Hierarchical Timing Wheels | Confluent

Request type (also called API key)

Request version (so the brokers can handle clients of different versions and respond accordingly)

Correlation ID: a number that uniquely identifies the request and also appears in the response and in the error logs (the ID is used for troubleshooting)

Client ID: used to identify the application that sent the request

Apache Kafka

由于Client并不知道要向哪个partition发消息,所以需要metadata request,然后broker返回topic,partition以及leader的信息。

kdg2 0602

Produce request

Acks=<0,1,all>

不等待,只等待leader,等待所有replica sync

Fetch request

zero-copy

区别:

img

img

boundary

Lower boundary: Only return results once you have at least 10K bytes to send me

Upper boundary: Chunk

Partition Allocation

  • 在Broker中平均分配
  • 每个Partition的各个replica在不同broker中
  • 机柜数据(0.10.0版本之后支持)

kdg2 0605

File

分段,比如1G或者一周的数据

删除时不能删除active segment

Indexes

每个partition都有一个index,对offset和文件位置作映射

Compact

kdg2 0607

Producers Retry

message will be stored at least once, but not exactly once.

当然,也可以配置enable.idempotence,让broker跳过重复的消息。

Producer需要能够handle Non-retriable errors

Consumer

从Partition fetch一个batch,检查最后的offset,然后发出新的request 从最后的offset开始申请batch。

由于consumer可能会出错,或者停止,需要有一种手段来恢复到上一次consume的offset。这就需要consumer去commit offset。

对于每一个Partition,consumer都会存储当前的位置。当consumer确认收到并且处理完message后,才会向kafka commit offset。

Writing

kdg2 0301

timing配置

kdg2 0302

max.block.ms

producer调用send()之后,由于send buffer满了,或者metadata取不到,这时设置一个超时抛异常。

delivery.timeout.ms

request.timeout.ms

这个很好理解,就是请求发出去了但是server无应答,超时抛异常。

retry.backoff.ms

默认每隔100ms进行一次retry,但是通过这个参数可以进行调整。

linger.ms

producer会在两种情况下发送batch:

  • batch已经写满了消息

  • 消息没写满,但是达到了linger.ms设置的等待时长

batch.size

Batch在内存中的字节长度

max.in.flight.requests.per.connection

在未收到response的情况下,最多发多少个请求。这个参数可以控制吞吐量,默认是5

in flight request如果大于1,那么有可能出现第一个batch失败,第二个成功的情况。

对于一些比如银行等业务,这个是不可接受的。但是小于2又会有性能问题。

enable.idempotence=true

message ordering <= 5时能够保证消息的顺序。

Quotas
quota.producer.default=2M

设置producer/consumer/request的 rate limit

但这个选项可能会导致消息囤积在producer的缓冲区,导致存储溢出

Reading

kdg2 0401

如果一个Consumer group里只有一个consumer,它会接收到所有Partition的消息。

如果一个Consumer group里的consumer数量大于partition数量,有的consumer就接收不到消息了。

kdg2 0404

如果存在多个Consumer group,多个group之间是互相独立的。

kdg2 0405

Rebalance

将partition的ownership从一个consumer转移到另一个consumer,称为rebalance.

Eager rebalance

kdg2 0406

所有consumer暂停消费消息,放弃ownership,重新加入consumer group,等待broker重新分配ownership。

Cooperative rebalance(incremental rebalance)

Consumer group leader会通知其他consumer,哪些partition会被重新安排。own这些Partiton的consumer会停止消费这些partition的消息,然后consumer group leader将这些partition重新分配给目标consumer。

kdg2 0407

通过发送心跳包来确认consumer在线。

第一个加入Consumer group的成为group leader。

Static group membership

consumer也可以配置为静态的成员,这样当consumer断开连接后,它的partition不会被rearrange,而是等待它上线后恢复消费。

max.poll.interval.ms

尽管kafka通过background thread心跳包来确认consumer是否在线,但是在线并不代表它能够正常处理消息。

可以通过这个值来控制,如果client超过这个时间还没有发送第二次poll(),那么认为它的线程已经dead了。

默认是5分钟

auto.offset.reset

有latest和earliest两种,控制没有offset时是读取最新消息还是全量消息。

partition.assignment.strategy

Range

partition除以consumer count后,得到连续的编号范围,分配给consumer。

RoundRobin

把所有的partition one-by-one分配给consumers

Sticky

比RoundRobin更加平衡,在Reassign的时候减少了Rebalance的次数

Cooperative Sticky

支持Cooperative Rebalance

offsets.retention.minutes

Group里的消息被消费完了,默认保留7天,然后offset会清零,consumer group会和新加入的一样。

Offsets

kdg2 0408

如上图,最坏的情况:如果BatchSize=5,上一个Batch处理完了,收到下一个Batch。但是Commit Offset=7没有发出去,然后处理到10的时候consumer挂了。这个时候,下一个接替的consumer会从2开始处理,导致从3~10这几个都被重复处理了。

解决办法就是manual commit offset,调用API处理完一条消息后就commit。

commitAsync没有retry,commitSync有retry。

当然,增加Specified commit会减少出错时重复处理的消息数量,但也会降低Throughput。

atomic multipartition writes

kdg2 0801

Consume,Process,Produce三步形成一个原子操作,就是让Produce时同时更新数据和Offset,要么都无法更新要么都可以更新。

kdg2 0802

Last Stable Offset, or LSO

安装

Mac

安装openjdk

brew install openjdk

然后开启zookeeper

zookeeper-server-start /opt/homebrew/Cellar/kafka/3.2.0/libexec/config/zookeeper.properties

开启kafka

Enable JMX

ITOM Practitioner Portal (microfocus.com)

env JMX_PORT=<portnumber> kafka-server-start /opt/homebrew/Cellar/kafka/3.2.0/libexec/config/server.properties

创建topic

kafka-topics --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test

输出: Created topic test.

(base) ➜  ~ kafka-topics --bootstrap-server localhost:9092 --describe --topic test
Topic: test TopicId: cqJ9A01WRpOhwcD1sBLKTA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
(base) ➜ ~

测试Consumer 和 Producer

(base) ➜  ~ kafka-console-producer --bootstrap-server localhost:9092 --topic test
>Hello Kafka
>Bye
>^C%
(base) ➜ ~ kafka-console-consumer --bootstrap-server localhost:9092 --topic test
^CProcessed a total of 0 messages
(base) ➜ ~ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
Hello Kafka
Bye
^CProcessed a total of 2 messages
(base) ➜ ~ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
Hello Kafka
Bye
^CProcessed a total of 2 messages
(base) ➜ ~

这里可以用zkcli看到,kafka已经在更新zookeeper的状态:

[zk: localhost:2181(CONNECTED) 2] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 3] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[__consumer_offsets, test]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/test/partitions
[0]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/test/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/test/partitions/0/state
[]
[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics/__consumer_offsets
[partitions]
[zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/__consumer_offsets/partitions
[0, 1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 2, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 3, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 4, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 5, 6, 7, 8, 9]

我们可以用cmak工具来看:

https://github.com/yahoo/CMAK

下载下来后,修改application.conf文件

cmak.zkhosts="0.0.0.0:2181"

然后启动,注意不要用jdk16以上的版本,加载css会报错,应该是个bug

bin/cmak -java-home /opt/homebrew/opt/openjdk@11/

然后访问localhost:9000

添加cluster, zk 设置为localhost:9000

可以看到topics,brokers

点进topics

image-20220821104616084

如果enable了JMX,我们就可以看到metrics了。

比如起一个console,作为consumer,一个作为producer

➜ ~ kafka-console-consumer --bootstrap-server localhost:9092 --topic test

~ kafka-console-producer --bootstrap-server localhost:9092 --topic test
>hello
>i'm fine

发现consumer那边收到了消息:

➜  ~ kafka-console-consumer --bootstrap-server localhost:9092 --topic test
hello
i'm fine

然后回到CMAK,刷新,发现metrics有变化

image-20220821110425179